package org.codehaus.activemq.store.bdb;

import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.SecondaryConfig;
import com.sleepycat.je.SecondaryCursor;
import com.sleepycat.je.SecondaryDatabase;
import com.sleepycat.je.Transaction;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class BDbTopicMessageStore extends BDbMessageStore
  implements TopicMessageStore
{
  private static final Log log = LogFactory.getLog(BDbTopicMessageStore.class);
  private Database subscriptionDatabase;

  public BDbTopicMessageStore(Database database, SecondaryDatabase secondaryDatabase, SecondaryConfig secondaryConfig, SequenceNumberCreator sequenceNumberCreator, WireFormat wireFormat, Database subscriptionDatabase)
  {
    super(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat);
    this.subscriptionDatabase = subscriptionDatabase;
  }

  public void incrementMessageCount(MessageIdentity messageId)
  {
  }

  public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack)
  {
  }

  public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
    checkClosed();
    try {
      doSetLastAcknowledgedMessageIdentity(subscription, messageIdentity);
    }
    catch (DatabaseException e) {
      throw JMSExceptionHelper.newJMSException("Failed to update last acknowledge messageID for : " + messageIdentity + ". Reason: " + e, e);
    }
  }

  public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException
  {
    checkClosed();
    SecondaryCursor cursor = null;
    try {
      DatabaseEntry lastAckKey = getLastAcknowledgedMessageID(subscription, lastDispatchedMessage);
      if (lastAckKey != null) {
        cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
        DatabaseEntry valueEntry = new DatabaseEntry();
        OperationStatus status = cursor.getSearchKey(lastAckKey, valueEntry, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS) {
          log.error("Could not find the last acknowledged record for: " + subscription + ". Status: " + status);
        }
        else
          while (true)
          {
            status = cursor.getNext(lastAckKey, valueEntry, LockMode.DEFAULT);
            if (status != OperationStatus.SUCCESS) {
              if (status == OperationStatus.NOTFOUND) break;
              log.warn("Strange result when iterating to end of collection: " + status); break;
            }

            ActiveMQMessage message = extractMessage(valueEntry);
            subscription.addMessage(getContainer(), message);
          }
      }
    }
    catch (DatabaseException e)
    {
      throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: " + subscription + ". Reason: " + e, e);
    }
    catch (IOException e)
    {
      throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: " + subscription + ". Reason: " + e, e);
    }
    finally
    {
      if (cursor != null)
        try {
          cursor.close();
        }
        catch (DatabaseException e) {
          log.warn("Caught exception closing cursor: " + e, e);
        }
    }
  }

  public MessageIdentity getLastestMessageIdentity() throws JMSException
  {
    checkClosed();
    SecondaryCursor cursor = null;
    MessageIdentity localMessageIdentity = null;
    try {
      cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
      DatabaseEntry keyEntry = new DatabaseEntry();
      DatabaseEntry valueEntry = new DatabaseEntry();
      OperationStatus status = cursor.getLast(keyEntry, valueEntry, LockMode.DEFAULT);
      if (status == OperationStatus.SUCCESS) {
        if (log.isDebugEnabled()) {
          log.debug("Loaded last sequence number of: " + BDbHelper.longFromBytes(keyEntry.getData()));
        }
        localMessageIdentity = new MessageIdentity(null, keyEntry); 
        //jsr 84;
      }
      if (status != OperationStatus.NOTFOUND) {
        log.error("Could not find the last sequence number. Status: " + status);
        localMessageIdentity = null;
      }      
      return localMessageIdentity;
    }
    catch (DatabaseException e)
    {
      //MessageIdentity localMessageIdentity;
      throw JMSExceptionHelper.newJMSException("Unable to load the last sequence number. Reason: " + e, e);
    }
    finally {
      if (cursor != null)
        try {
          cursor.close();
        }
        catch (DatabaseException e) {
          log.warn("Caught exception closing cursor: " + e, e);
        }
    }
  }

  public SubscriberEntry getSubscriberEntry(ConsumerInfo info)
    throws JMSException
  {
    return null;
  }

  public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException
  {
  }

  public synchronized void stop() throws JMSException {
    JMSException firstException = BDbPersistenceAdapter.closeDatabase(this.subscriptionDatabase, null);
    this.subscriptionDatabase = null;
    super.stop();
    if (firstException != null)
      throw JMSExceptionHelper.newJMSException("Unable to close the subscription database: " + firstException, firstException);
  }

  protected DatabaseEntry getLastAcknowledgedMessageID(Subscription subscription, MessageIdentity lastDispatchedMessage)
    throws DatabaseException
  {
    DatabaseEntry key = createKey(subscription.getPersistentKey());
    DatabaseEntry value = new DatabaseEntry();
    OperationStatus status = this.subscriptionDatabase.get(null, key, value, null);
    if (status == OperationStatus.SUCCESS) {
      return value;
    }
    if (status == OperationStatus.NOTFOUND)
    {
      if (lastDispatchedMessage != null) {
        return doSetLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
      }
    }
    else {
      log.warn("Unexpected status return from querying lastAcknowledgeSequenceNumber for: " + subscription + " status: " + status);
    }
    return null;
  }

  protected DatabaseEntry doSetLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws DatabaseException {
    Transaction transaction = BDbHelper.getTransaction();
    DatabaseEntry key = createKey(subscription.getPersistentKey());
    DatabaseEntry value = getSequenceNumberKey(messageIdentity);
    this.subscriptionDatabase.put(transaction, key, value);
    return value;
  }
}